package com.xzx.flink.streamapi.transform;

import com.xzx.flink.bean.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author xinzhixuan
 * @version 1.0
 * @date 2021-08-24 11:11 下午
 */
public class Transform_09_RichMap {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> streamSource = env.readTextFile("/Users/xinzhixuan/work/git/flink-study-java/src/main/resources/sensor.txt");
        SingleOutputStreamOperator<SensorReading> map = streamSource.map(new RichMapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] split = value.split(" ");
                return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println(Thread.currentThread().getName() + "open");
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("close");
            }
        });
        map.print();

        env.execute(Transform_09_RichMap.class.getName());
    }
}
